---
id: event-brokers
sidebar_label: Event Brokers
title: Event Brokers
description: Find out how open source chatbot framework Rasa allows you to stream events to a message broker.
---

An event broker allows you to connect your running assistant to other services that process the data coming
in from conversations. The event broker publishes messages to a message streaming service,
also known as a message broker, to forward Rasa [Events](./action-server/events.mdx) from the Rasa server to other services.

## Format

All events are streamed to the broker as serialized dictionaries every time
the tracker updates its state. An example event emitted from the `default`
tracker looks like this:

```json
{
    "sender_id": "default",
    "timestamp": 1528402837.617099,
    "event": "bot",
    "text": "what your bot said",
    "data": "some data about e.g. attachments"
    "metadata" {
          "a key": "a value",
     }
}
```

The `event` field takes the event's `type_name` (for more on event
types, check out the [events](./action-server/events.mdx) docs).

## Pika Event Broker

The example implementation we're going to show you here uses
[Pika](https://pika.readthedocs.io) , the Python client library for
[RabbitMQ](https://www.rabbitmq.com).

### Adding a Pika Event Broker Using the Endpoint Configuration

You can instruct Rasa to stream all events to your Pika event broker by adding an `event_broker` section to your
`endpoints.yml`:

```yaml-rasa (docs/sources/data/test_endpoints/event_brokers/pika_endpoint.yml)
```

A comprehensive list of all arguments that can be customized in the `endpoints.yml` file can be found in the [reference documentation](https://rasa.com/docs/rasa/reference/rasa/core/brokers/pika/#__init__).
Rasa will automatically start streaming events when you restart the Rasa server.

### Adding SSL options to the Pika Event Broker

You can create RabbitMQ SSL options by setting the following required environment variables:
- `RABBITMQ_SSL_CLIENT_CERTIFICATE`: path to the SSL client certificate
- `RABBITMQ_SSL_CLIENT_KEY`: path to the SSL client key

Please note that specifying 'RABBITMQ_SSL_CA_FILE' via environment variables is no longer supported, as well as
specifying `RABBITMQ_SSL_KEY_PASSWORD` environment variable - please use a key file that is not encrypted instead.

### Adding a Pika Event Broker in Python

Here is how you add it using Python code:

```python
import asyncio

from rasa.core.brokers.pika import PikaEventBroker
from rasa.core.tracker_store import InMemoryTrackerStore

pika_broker = PikaEventBroker('localhost',
                              'username',
                              'password',
                              queues=['rasa_events'],
                              event_loop=event_loop
                              )
asyncio.run(pika_broker.connect())

tracker_store = InMemoryTrackerStore(domain=domain, event_broker=pika_broker)
```

### Implementing a Pika Event Consumer

You need to have a RabbitMQ server running, as well as another application
that consumes the events. This consumer to needs to implement Pika's
`start_consuming()` method with a `callback` action. Here's a simple
example:

```python
import json
import pika


def _callback(ch, method, properties, body):
        # Do something useful with your incoming message body here, e.g.
        # saving it to a database
        print("Received event {}".format(json.loads(body)))

if __name__ == "__main__":

    # RabbitMQ credentials with username and password
    credentials = pika.PlainCredentials("username", "password")

    # Pika connection to the RabbitMQ host - typically 'rabbit' in a
    # docker environment, or 'localhost' in a local environment
    connection = pika.BlockingConnection(
        pika.ConnectionParameters("rabbit", credentials=credentials)
    )

    # start consumption of channel
    channel = connection.channel()
    channel.basic_consume(queue="rasa_events", on_message_callback=_callback, auto_ack=True)
    channel.start_consuming()
```

### Sending Events to Multiple Queues

You can specify multiple event queues to publish events to.
This should work for all event brokers supported by Pika (e.g. RabbitMQ)

## Kafka Event Broker

While RabbitMQ is the default event broker, it is possible to use [Kafka](https://kafka.apache.org/) as the main broker for your
events. Rasa uses the [confluent-kafka](https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html#) library, a Kafka client written in Python.
You will need a running Kafka server.

### Partition Key

Rasa's Kafka producer can optionally be configured to partition messages by conversation ID.
This can be configured by setting `partition_by_sender` in the `endpoints.yml` file to True.
By default, this parameter is set to `False` and the producer will randomly assign a partition to each message.  

```yaml-rasa title="endpoints.yml"
event_broker:
  type: kafka
  partition_by_sender: True
  security_protocol: PLAINTEXT
  topic: topic
  url: localhost
  client_id: kafka-python-rasa
```

### Authentication and Authorization

Rasa's Kafka producer accepts the following types of security protocols: `SASL_PLAINTEXT`, `SSL`, `PLAINTEXT`
and `SASL_SSL`.

For development environments, or if the brokers servers and clients are located
into the same machine, you can use simple authentication with `SASL_PLAINTEXT` or `PLAINTEXT`.
By using this protocol, the credentials and messages exchanged between the clients and servers
will be sent in plaintext. Thus, this is not the most secure approach, but since it's simple
to configure, it is useful for simple cluster configurations.
`SASL_PLAINTEXT` protocol requires the setup of the `username` and `password`
previously configured in the broker server.

If the clients or the brokers in the kafka cluster are located in different
machines, it's important to use the `SSL` or `SASL_SSL` protocol to ensure encryption of data
and client authentication. After generating valid certificates for the brokers and the
clients, the path to the certificate and key generated for the producer must
be provided as arguments, as well as the CA's root certificate.

When using the `SASL_PLAINTEXT` and `SASL_SSL` protocols, the `sasl_mechanism` can be
optionally configured and is set to `PLAIN` by default. Valid values for `sasl_mechanism`
are: `PLAIN`, `GSSAPI`, `OAUTHBEARER`, `SCRAM-SHA-256`, and `SCRAM-SHA-512`.

If `GSSAPI` is used for the `sasl_mechanism`, you will need to additionally install
[python-gssapi](https://pypi.org/project/python-gssapi/) and the necessary C library
Kerberos dependencies.

If the `ssl_check_hostname` parameter is enabled, the clients will verify
if the broker's hostname matches the certificate. It's used on client's connections
and inter-broker connections to prevent man-in-the-middle attacks.

### Adding a Kafka Event Broker Using the Endpoint Configuration

You can instruct Rasa to stream all events to your Kafka event broker by adding an `event_broker` section to your
`endpoints.yml`.

Using the `SASL_PLAINTEXT` protocol the endpoints file must have the following entries:

```yaml-rasa (docs/sources/data/test_endpoints/event_brokers/kafka_sasl_plaintext_endpoint.yml)
```

Using the `PLAINTEXT` protocol the endpoints file must have the following entries:

```yaml-rasa (docs/sources/data/test_endpoints/event_brokers/kafka_plaintext_endpoint.yml)
```

If using the `SSL` protocol, the endpoints file should look like:

```yaml-rasa (docs/sources/data/test_endpoints/event_brokers/kafka_ssl_endpoint.yml)
```

If using the `SASL_SSL` protocol, the endpoints file should look like:

```yaml-rasa (docs/sources/data/test_endpoints/event_brokers/kafka_sasl_ssl_endpoint.yml)
```

### Sending Events to Multiple Queues

Kafka does not allow you to configure multiple topics. 

However, multiple consumers can read from the same queue as long as they are in different consumer groups.
Each consumer group will process all events independent of each other 
(in a sense, each group has their own reference to the last event they have processed). [Kafka: The Definitive Guide](https://www.oreilly.com/library/view/kafka-the-definitive/9781491936153/ch04.html#:~:text=Kafka%20consumers%20are%20typically%20part,the%20partitions%20in%20the%20topic.)

## SQL Event Broker

It is possible to use an SQL database as an event broker. Connections to databases are established using
[SQLAlchemy](https://www.sqlalchemy.org/), a Python library which can interact with many
different types of SQL databases, such as [SQLite](https://sqlite.org/index.html),
[PostgreSQL](https://www.postgresql.org/) and more. The default Rasa installation allows connections to SQLite
and PostgreSQL databases. To see other options, please see the
[SQLAlchemy documentation on SQL dialects](https://docs.sqlalchemy.org/en/13/dialects/index.html).

### Adding a SQL Event Broker Using the Endpoint Configuration

To instruct Rasa to save all events to your SQL event broker, add an `event_broker` section to your
`endpoints.yml`. For example, a valid SQLite configuration
could look like this:

```yaml-rasa title="endpoints.yml"
event_broker:
  type: SQL
  dialect: sqlite
  db: events.db
```

PostgreSQL databases can be used as well:

```yaml-rasa title="endpoints.yml"
event_broker:
  type: SQL
  url: 127.0.0.1
  port: 5432
  dialect: postgresql
  username: myuser
  password: mypassword
  db: mydatabase
```

With this configuration applied, Rasa will create a table called `events` on the database,
where all events will be added.

## FileEventBroker

It is possible to use the `FileEventBroker` as an event broker. This implementation will log events to a file in json format.
You can provide a path key in the `endpoints.yml` file if you wish to override the default file name: `rasa_event.log`.

## Custom Event Broker

If you need an event broker which is not available out of the box, you can implement your own.
This is done by extending the base class `EventBroker`.

Your custom event broker class must also implement the following base class methods:
- `from_endpoint_config`: creates an `EventBroker` object from the endpoint configuration.
[(source code - see for signature)](https://github.com/RasaHQ/rasa/blob/main/rasa/core/brokers/broker.py#L45).
- `publish`: publishes a json-formatted [Rasa event](https://rasa.com/docs/rasa/reference/rasa/shared/core/events/) into an event queue.
[(source code - see for signature)](https://github.com/RasaHQ/rasa/blob/main/rasa/core/brokers/broker.py#L63).
- `is_ready`: determine whether or not the event broker is ready. [(source code - see for signature)](https://github.com/RasaHQ/rasa/blob/main/rasa/core/brokers/broker.py#L67).
- `close`: close the connection to an event broker. [(source code - see for signature)](https://github.com/RasaHQ/rasa/blob/main/rasa/core/brokers/broker.py#L75).

### Configuration

Put the module path to your custom event broker and the parameters you require in your `endpoints.yml`:

 ```yaml-rasa title="endpoints.yml"
 event_broker:
   type: path.to.your.module.Class
   url: localhost
   a_parameter: a value
   another_parameter: another value
 ```
